Skip to main content

Spark Getting Started Guide

Setup

Setup

To use LakeSoul in Spark, first configure Spark Catalog. LakeSoul uses Apache Spark’s DataSourceV2 API for data source and catalog implementations. Moreover, LakeSoul provides scala table API to extend the capability of LakeSoul table.

Spark 3 Support Matrix

LakeSoulSpark Version
2.2.x-2.4.x3.3.x
2.0.x-2.1.x3.1.x

Spark Shell/SQL

Run spark-shell/spark-sql with the LakeSoulSparkSessionExtension sql extension.

spark-sql --conf spark.sql.extensions=com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension --conf spark.sql.catalog.lakesoul=org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog --conf spark.sql.defaultCatalog=lakesoul --jars lakesoul-spark-2.5.0-spark-3.3.jar

Setup Maven Project

Include maven dependencies in your project:

<dependency>
<groupId>com.dmetasoul</groupId>
<artifactId>lakesoul</artifactId>
<version>2.5.0-spark-3.3</version>
</dependency>
// Scala
import org.apache.spark.sql.SparkSession
import spark.implicits._
import com.dmetasoul.lakesoul.tables.LakeSoulTable

val spark = SparkSession.builder()
.master("local")
.config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog")
.config("spark.sql.defaultCatalog", "lakesoul")
.getOrCreate()

Create Namespace

First, create a namespace for LakeSoul table, default namespace of LakeSoul Catalog is default.

# Spark SQL
CREATE NAMESPACE lakesoul_namespace;
USE lakesoul_namespace;

Create Table

Create a partitioned LakeSoul table with the clause USING lakesoul:

// Spark SQL
CREATE TABLE lakesoul_table (id BIGINT, date STRING, data STRING)
USING lakesoul
PARTITIONED BY (date)
LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_table'

Primary Key Table

In LakeSoul, a table with primary keys is defined as a hash-partitioned table. To create such a table, use the USING lakesoul clause and specify the TBLPROPERTIES setting, where 'hashPartitions' designates a comma-separated list of primary key column names, and 'hashBucketNum' determines the size or number of hash buckets.

// Spark SQL
CREATE TABLE lakesoul_hash_table (id BIGINT NOT NULL, date STRING, name STRING)
USING lakesoul
PARTITIONED BY (date)
LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_hash_table'
TBLPROPERTIES (
'hashPartitions'='id',
'hashBucketNum'='2')
// scala
val tablePath= "s3://lakesoul-test-bucket/test_table"
val df = Seq(("2021-01-01",1,"rice"),("2021-01-01",2,"bread")).toDF("date","id","name")
df.write
.mode("append")
.format("lakesoul")
.option("rangePartitions","date")
.option("hashPartitions","id")
.option("hashBucketNum","2")
.save(tablePath)

CDC Table

Optionally, a hash-partitioned LakeSoul table has the capability to record Change Data Capture (CDC) data, enabling the tracking of data modifications. To create a LakeSoul table with CDC support, one can utilize the DDL statement for a hash-partitioned LakeSoul table and include an additional TBLPROPERTIES setting specifying the 'lakesoul_cdc_change_column' attribute. This attribute introduces an implicit column that assists the table in efficiently handling CDC information, ensuring precise tracking and management of data changes.

// Spark SQL
CREATE TABLE lakesoul_cdc_table (id BIGINT NOT NULL, date STRING, name STRING)
USING lakesoul
PARTITIONED BY (date)
LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_cdc_table'
TBLPROPERTIES(
'hashPartitions'='id',
'hashBucketNum'='2',
'lakesoul_cdc_change_column' = 'op'
)

Insert/Merge Data

To append new data to a non-hash-partitioned table using Spark SQL, use INSERT INTO.

To append new data to a table using DataFrame, use DataFrameWriterV2 API. If this is the first write of the table, it will also auto-create the corresponding LakeSoul table.

// Spark SQL
INSERT INTO TABLE lakesoul_table VALUES (1, '2024-01-01', 'Alice'), (2, '2024-01-01', 'Bob'), (1, "2024-01-02", "Cathy")

To append new data to a hash-partitioned table using Spark SQL, use Merge INTO.

To append new data to a hash-partitioned table using DataFrame, use LakeSoulTable upsert API.

// Spark SQL
// Create source_view
CREATE OR REPLACE VIEW spark_catalog.default.source_view (id , date, data)
AS SELECT (1 as `id`, '2024-01-01' as `date`, 'data' as `data`)

// Merge source_view Into lakesoul_hash_table

MERGE INTO lakesoul_hash_table AS t
USING spark_catalog.default.source_view AS s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *

Update Data

LakeSoul tables can be updated by a DataFrame or using a standard UPDATE statement. To update data to a table using DataFrame, use LakeSoulTable updateExpr API.

// Spark SQL
UPDATE table_namespace.table_name SET name = "David" WHERE id = 2

Delete Data

LakeSoul tables can be removes the records by a DataFrame or using a standard DELETE statement. To delete data to a table using DataFrame, use LakeSoulTable delete API.

// Spark SQL
DELETE FROM lakesoul.lakesoul_namespace.tbl WHERE key =1

Query Data

LakeSoul tables can be queried using a DataFrame or Spark SQL.

// Spark SQL
SELECT * FROM lakesoul_table

Time Travel Query

LakeSoul supports time travel query to query the table at any point-in-time in history or the changed data between two commit time.

// Scala
val tablePath = "file:/tmp/lakesoul_namespace/lakesoul_cdc_table"
Seq(("range1", "hash1", "insert"), ("range2", "hash2", "insert"), ("range3", "hash2", "insert"), ("range4", "hash2", "insert"), ("range4", "hash4", "insert"), ("range3", "hash3", "insert"))
.toDF("range", "hash", "op")
.write
.mode("append")
.format("lakesoul")
.option("rangePartitions", "range")
.option("hashPartitions", "hash")
.option("hashBucketNum", "2")
.option("shortTableName", "lakesoul_cdc_table")
.option("lakesoul_cdc_change_column", "op")
.save(tablePath)
// record the version of 1st commit
val versionA: String = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(System.currentTimeMillis)


val lakeTable = LakeSoulTable.forPath(tablePath)
lakeTable.upsert(Seq(("range1", "hash1-1", "delete"), ("range2", "hash2-10", "delete"))
.toDF("range", "hash", "op"))
// record the version of 2nd commit
val versionB: String = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(System.currentTimeMillis)

lakeTable.upsert(Seq(("range1", "hash1-13", "insert"), ("range2", "hash2-13", "update"))
.toDF("range", "hash", "op"))
lakeTable.upsert(Seq(("range1", "hash1-15", "insert"), ("range2", "hash2-15", "update"))
.toDF("range", "hash", "op"))
// record the version of 3rd,4th commits
val versionC: String = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(System.currentTimeMillis)

Complete Query

// Scala
spark.sql("SELECT * FROM lakesoul_cdc_table")

Snapshot Query

LakeSoul supports snapshot query for query the table at a point-in-time in history.

// Scala
spark.read.format("lakesoul")
.option(LakeSoulOptions.PARTITION_DESC, "range=range2")
.option(LakeSoulOptions.READ_END_TIME, versionB)
.option(LakeSoulOptions.READ_TYPE, ReadType.SNAPSHOT_READ)
.load(tablePath)

Incremental Query

LakeSoul supports incremental query to obtain a set of records that changed between a start and end time.

// Scala
spark.read.format("lakesoul")
.option(LakeSoulOptions.PARTITION_DESC, "range=range1")
.option(LakeSoulOptions.READ_START_TIME, versionA)
.option(LakeSoulOptions.READ_END_TIME, versionB)
.option(LakeSoulOptions.READ_TYPE, ReadType.INCREMENTAL_READ)
.load(tablePath)

Next steps

Next, you can learn more usage cases about LakeSoul tables in Spark at Spark API docs.